在資料同步、CDC(Change Data Capture)、事件驅動系統中,「如何即時取得 PostgreSQL 的資料變更」是一個非常核心的問題。pg-walstream 是一個使用 Rust 實作的 PostgreSQL WAL(Write-Ahead Log)Streaming 函式庫,透過 PostgreSQL Logical Replication Protocol,讓開發者可以直接串流並解析資料庫變更事件,作為自訂 CDC 或資料同步系統的底層元件。
PostgreSQL 提供 Logical Replication 機制,能將 WAL 中的資料變更以結構化事件的方式輸出,pg-walstream 的目標是:
適合以下場景:
pg-walstream 專注於「穩定、高效、可控」:
pg-walstream 並不是一個「完整 CDC 平台」,而是專注於 WAL streaming 的核心邏輯:
我另一個專案 pg2any 專注完成 CDC 平台, 也是使用這個當作 replication protocal library
┌─────────────────────────────────────────┐
│ Application Layer │
│ (Your CDC / Replication Logic) │
└──────────────┬──────────────────────────┘
│
┌──────────────▼──────────────────────────┐
│ LogicalReplicationStream │
│ - Connection management │
│ - Event processing │
│ - LSN feedback │
└──────────────┬──────────────────────────┘
│
┌──────────────▼──────────────────────────┐
│ LogicalReplicationParser │
│ - Protocol parsing │
│ - Message deserialization │
└──────────────┬──────────────────────────┘
│
┌──────────────▼──────────────────────────┐
│ BufferReader / BufferWriter │
│ - Zero-copy operations │
│ - Binary protocol handling │
└─────────────────────────────────────────┘
你可以在「Your application」這一層:
在 Cargo.toml 中加入:
[dependencies]
pg_walstream = "0.3.0"
系統層級需要 PostgreSQL client library
# Ubuntu / Debian
sudo apt install libpq-dev clang libclang-dev
# RHEL / CentOS / Fedora
sudo dnf install postgresql-devel
Logical Replication 需要 PostgreSQL 層級設定:
wal_level = logical
max_replication_slots = 4
max_wal_senders = 4
建立 publication 與 replication user:
CREATE PUBLICATION my_publication FOR ALL TABLES;
CREATE USER replication_user
WITH REPLICATION
PASSWORD 'secure_password';
GRANT SELECT ON ALL TABLES IN SCHEMA public
TO replication_user;
以下是一個最小可運作的 async 範例,示範如何使用 pg-walstream 連線並接收 WAL 事件。
use futures::stream::{self, StreamExt};
use pg_walstream::{
CancellationToken, LogicalReplicationStream, ReplicationStreamConfig, RetryConfig,
};
use std::env;
use std::time::Duration;
use tracing::{error, info, Level};
use tracing_subscriber;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize logging
tracing_subscriber::fmt()
.with_max_level(Level::INFO)
.with_target(false)
.init();
info!("Starting PostgreSQL WAL streaming example");
// Get connection string from environment or use default
let connection_string = env::var("DATABASE_URL").unwrap_or_else(|_| {
"postgresql://postgres:password@localhost:5432/postgres?replication=database".to_string()
});
info!("Connection string: {}", mask_password(&connection_string));
// Configure the replication stream
let config = ReplicationStreamConfig::new(
"example_slot".to_string(), // Replication slot name
"my_publication".to_string(), // Publication name (must exist)
2, // Protocol version (2 supports streaming)
true, // Enable streaming of large transactions
Duration::from_secs(10), // Send feedback every 10 seconds
Duration::from_secs(30), // Connection timeout
Duration::from_secs(60), // Health check interval
RetryConfig::default(), // Use default retry strategy
);
info!("Creating replication stream...");
// Create and initialize the stream
let mut stream = LogicalReplicationStream::new(&connection_string, config).await?;
info!("Stream created successfully");
// Start replication from the latest position (None = latest)
stream.start(None).await?;
info!("Replication started successfully");
info!("Listening for changes... (Press Ctrl+C to stop)");
info!("You can now make changes to your database tables to see events");
// Create cancellation token for graceful shutdown
let cancel_token = CancellationToken::new();
let cancel_token_clone = cancel_token.clone();
// Setup Ctrl+C handler
tokio::spawn(async move {
tokio::signal::ctrl_c()
.await
.expect("Failed to listen for ctrl-c");
info!("Received shutdown signal, cleaning up...");
cancel_token_clone.cancel();
});
// Convert to EventStream
let event_stream = stream.into_stream(cancel_token);
// Wrap with futures::stream::unfold to get a proper futures::Stream
// This allows us to use stream combinators!
// Use Box::pin to pin the stream on the heap so we can reuse it
let mut pg_stream = Box::pin(stream::unfold(
event_stream,
|mut event_stream| async move {
match event_stream.next().await {
Ok(event) => {
// Update applied LSN after successful event retrieval
event_stream.update_applied_lsn(event.lsn.value());
Some((Ok(event), event_stream))
}
Err(e) => {
// Return error and stop the stream
Some((Err(e), event_stream))
}
}
},
));
// Now we can use stream combinators!
info!("Using futures::Stream combinators for advanced processing");
while let Some(result) = pg_stream.as_mut().next().await {
match result {
Ok(event) => {
info!("Received event: {:?}", event);
}
Err(e) => {
error!("Error: {}", e);
break;
}
}
}
info!("Graceful shutdown complete");
Ok(())
}
/// Mask password in connection string for logging
fn mask_password(conn_str: &str) -> String {
if let Some(proto_end) = conn_str.find("://") {
let proto = &conn_str[..proto_end + 3]; // e.g., "postgresql://"
let rest = &conn_str[proto_end + 3..];
if let Some(at_pos) = rest.find('@') {
let credentials = &rest[..at_pos];
let after_at = &rest[at_pos..];
if let Some(colon_pos) = credentials.find(':') {
let user = &credentials[..colon_pos];
return format!("{}{}:****{}", proto, user, after_at);
}
}
}
conn_str.to_string()
}
更多 example 可以參考 https://github.com/isdaniel/pg-walstream/tree/main/examples
pg-walstream 提供了一個乾淨、可擴展的 Rust API,讓開發者可以直接使用 PostgreSQL Logical Replication
如果你正在:
pg-walstream 會是一個非常適合的起點。
專案連結:
https://github.com/isdaniel/pg-walstream